package com.isyscore.os.metadata.service.executor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.util.ApplicationUtils;
import com.isyscore.os.metadata.enums.TaskPeriodUnit;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.util.concurrent.*;

import static com.isyscore.os.core.exception.ErrorCode.JOB_IS_RUNNING;

@Slf4j
public abstract class BaseExecutor implements Runnable {
    private static final double THRESHOLD = Double.parseDouble(ApplicationUtils.getProperty("task.threshold"));
    private static LocalDateTime logTime = LocalDateTime.now();
    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
            Integer.parseInt(ApplicationUtils.getProperty("task.threadPool.maxSize")),
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1), new ThreadFactoryBuilder().setNameFormat("TASK-RUN-POOL-%d").build(),
            new ThreadPoolExecutor.AbortPolicy());

    private static final DelayQueue<DelayTask<BaseExecutor>> delayQueue = new DelayQueue<>();

    /**
     * 消费延迟任务
     */
    static {
        startDelayedTaskProcessor();
    }

    /**
     * @param immediate 是否立即执行
     */
    public void execute(boolean immediate) {
        //启动任务时如果指定的流程定义当前已经存在一条正在运行的任务实例，则直接报错
        if (jobIsRunning()) {
            throw new DataFactoryException(JOB_IS_RUNNING);
        }
        synchronized (BaseExecutor.class){//为了避免并发提交的情况下线程活动数获取不准的情况
            log.info("线程池使用率:"+(threadPool.getActiveCount()+1) / Double.valueOf(threadPool.getMaximumPoolSize()));
            if ((threadPool.getActiveCount()+1) / Double.valueOf(threadPool.getMaximumPoolSize()) > THRESHOLD && !immediate) {
                delayQueue.add(new DelayTask(getTaskPeriodUnit().getDelayTime() * 1000, this));
                log.info("触发任务延迟");
                threadPoolStatus();
            } else {
                threadPool.submit(this);
            }
        }
    }


    @Override
    public void run() {
        runJob();
    }

    public static void threadPoolStatus() {
        if (logTime.plusMinutes(1L).isBefore(LocalDateTime.now())) {
            log.info("当前活动线程数：" + threadPool.getActiveCount());
            log.info("核心线程数：" + threadPool.getCorePoolSize());
            log.info("总线程数：" + threadPool.getPoolSize());
            log.info("最大线程池数量" + threadPool.getMaximumPoolSize());
            log.info("线程处理队列长度" + threadPool.getQueue().size());
            logTime = LocalDateTime.now();
        }
    }

    /**
     * 启用单线程消费延迟队列中的任务
     */
    private static void startDelayedTaskProcessor() {
        Thread t = new Thread(() -> {
            try {
                while (true) {
                    DelayTask<BaseExecutor> delayedTask = delayQueue.take();
                    delayedTask.getTask().execute(true);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        t.setName("DelayedTaskCustomer");
        t.start();
    }

    abstract protected void runJob();

    abstract protected boolean jobIsRunning();

    abstract TaskPeriodUnit getTaskPeriodUnit();

}
